package com.offcn.bigdata.spark.p3

import org.apache.spark.{SparkConf, SparkContext}

/*
    当对多个值进行累加的时候，声明多个累加器显然就不是一个很好的解决思路
    自定义累加器
 */
object _06AccumulatorOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${_05AccumulatorOps.getClass.getSimpleName}")
            .setMaster("local[*]")

        val sc = new SparkContext(conf)

        val list = sc.parallelize(List(
            "a is abstraction in spark is",
            "shared spark that abstraction be is in spark abstraction"
        ))
        val myAccu = new MyAccumulator
        sc.register(myAccu, "MyAccu")
        var pairs = list.flatMap(_.split("\\s+")).map(word => {
//            if(word == "spark") {
//                myAccu.add(word)
//            } else if(word == "abstraction") {
//                myAccu.add(word)
//            }
            if(word == "spark" || word == "abstraction") {
                myAccu.add(word)
            }
            (word, 1)
        })

        val ret = pairs.reduceByKey(_+_)

        println("===action触发前==sparkCount: " + myAccu.value)
        ret.foreach(println)

        println("===action触发后==sparkCount: " + myAccu.value)

        Thread.sleep(1000000)
        sc.stop()
    }
}
